[[connecting]]
= Connecting to Kafka

* `KafkaAdmin` - see xref:kafka/configuring-topics.adoc[Configuring Topics]
* `ProducerFactory` - see xref:kafka/sending-messages.adoc[Sending Messages]
* `ConsumerFactory` - see xref:kafka/receiving-messages.adoc[Receiving Messages]

Starting with version 2.5, each of these extends `KafkaResourceFactory`.
This allows changing the bootstrap servers at runtime by adding a `Supplier<String>` to their configuration:  `setBootstrapServersSupplier(() +++->+++ ...)`.
This will be called for all new connections to get the list of servers.
Consumers and Producers are generally long-lived.
To close existing Producers, call `reset()` on the `DefaultKafkaProducerFactory`.
To close existing Consumers, call `stop()` (and then `start()`) on the `KafkaListenerEndpointRegistry` and/or `stop()` and `start()` on any other listener container beans.

For convenience, the framework also provides an `ABSwitchCluster` which supports two sets of bootstrap servers; one of which is active at any time.
Configure the `ABSwitchCluster` and add it to the producer and consumer factories, and the `KafkaAdmin`, by calling `setBootstrapServersSupplier()`.
When you want to switch, call `primary()` or `secondary()` and call `reset()` on the producer factory to establish new connection(s); for consumers, `stop()` and `start()` all listener containers.
When using `@KafkaListener`+++s+++, `stop()` and `start()` the `KafkaListenerEndpointRegistry` bean.

See the Javadocs for more information.

[[factory-listeners]]
== Factory Listeners

Starting with version 2.5, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` can be configured with a `Listener` to receive notifications whenever a producer or consumer is created or closed.

.Producer Factory Listener
[source, java]
----
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
----

.Consumer Factory Listener
[source, java]
----
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}
----

In each case, the `id` is created by appending the `client-id` property (obtained from the `metrics()` after creation) to the factory `beanName` property, separated by `.`.

These listeners can be used, for example, to create and bind a Micrometer `KafkaClientMetrics` instance when a new client is created (and close it when the client is closed).

The framework provides listeners that do exactly that; see xref:kafka/micrometer.adoc#micrometer-native[Micrometer Native Metrics].

[[default-client-id-prefixes]]
== Default client ID prefixes

Starting with version 3.2, for Spring Boot applications which define an application name using the `spring.application.name` property, this name is now used
as a default prefix for auto-generated client IDs for these client types:

- consumer clients which don't use a consumer group
- producer clients
- admin clients

This makes it easier to identify these clients at server side for troubleshooting or applying quotas.

.Example client ids resulting for a Spring Boot application with `spring.application.name=myapp`
[%autowidth]
|===
|Client Type |Without application name |With application name

|consumer without consumer group
|consumer-null-1
|myapp-consumer-1

|consumer with consumer group "mygroup"
|consumer-mygroup-1
|consumer-mygroup-1

|producer
|producer-1
|myapp-producer-1

|admin
|adminclient-1
|myapp-admin-1
|===

